/**
 * Project: dx.zk.client
 * 
 * File Created at 2016年4月12日
 * 
 * Copyright 2015-2015 dx.com Croporation Limited.
 * All rights reserved.
 *
 * This software is the confidential and proprietary information of
 * DongXue software Company. ("Confidential Information").  You shall not
 * disclose such Confidential Information and shall use it only in
 * accordance with the terms of the license agreement you entered into
 * with dx.com.
 */
package com.dx.zkClient.opt;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;

import com.dx.zkClient.ZKClient;

/** 
* @ClassName: ZkOperate 
* @Description: zookeeper操作类
* inBackground()则代表不需要返回直接异步
* @author wuzhenfang(wzfbj2008@163.com)
* @date 2016年4月12日 下午6:41:59 
* @version V1.0 
*/
public class ZkOperate {

	/**
	 * 创建临时节点 临时的下面不能有孩子
	 * 并递归创建父节点
	 * @param path
	 * @param data
	 * @return
	 * @throws Exception
	 */
	public String createEphemeralNode(String path,byte[] data) throws Exception{
		path = valiPath(path);
		return getClient().create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, data);
	}
	/**
	 * 创建节点，并递归创建父节点
	 * CreateMode: PERSISTENT 持久化节点
	 *             /PERSISTENT_SEQUENTIAL 顺序自动编号持久化节点，这种节点会根据当前已存在的节点数自动加 1
	 *             /EPHEMERAL临时节点， 客户端session超时这类节点就会被自动删除 临时的下面不能有孩子
	 *             /EPHEMERAL_SEQUENTIAL   临时自动编号节点
	 * creatingParentsIfNeeded://如果指定的节点的父节点不存在，递归创建父节点             
	 * @param path
	 * @param data
	 * @return
	 * @throws Exception
	 */
	public String createNode(String path,byte[] data) throws Exception{
		path = valiPath(path);
		//判断节点是否存在
		if(checkExists(path)){
			setNodeData(path, data);
			return path;
		}else{
			return getClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, data);
		}
	}
	
	/**
	 * 删除节点
	 * @param path
	 * @return
	 * @throws Exception
	 */
	public boolean deleteNode(String path) throws Exception {
		path = valiPath(path);
		// 判断节点是否存在
		if (checkExists(path)) {
			//删除给定的节点，并保证它完成
			getClient().delete().guaranteed().inBackground().forPath(path);
		}else{
			return false;
		}
		return true;
	}
	
	/**
	 * 删除该节点，包含孩子节点(并递归删除父节点)
	 * guaranteed 保证删除
	 * @param path
	 * @return
	 * @throws Exception
	 */
	public boolean deleteNodeIncludChildrenNode(String path) throws Exception{
		path = valiPath(path);
		// 判断节点是否存在
		if (checkExists(path)) {
			getClient().delete().guaranteed().deletingChildrenIfNeeded().inBackground().forPath(path);
		}else{
			return false;
		}
		return true;
	}
	
	/**
	 * 设置节点数据
	 * @param path
	 * @param data
	 * @return
	 * @throws Exception
	 */
	public boolean setNodeData(String path,byte[] data) throws Exception{
		path = valiPath(path);
		if (checkExists(path)) {
			getClient().setData().inBackground().forPath(path, data);
		}else{
			return false;
		}
		return true;
	}
	
	/**
	 * 获取当前路径下的数据信息
	 * @param path
	 * @return
	 * @throws Exception
	 */
	public byte[] getNodeData(String path) throws Exception{
		path = valiPath(path);
		if (checkExists(path)) {
			return getClient().getData().storingStatIn(new Stat()).forPath(path);
		}else{
			return null;
		}
	}
	
	/**
	 * 获得该节点下的所有的孩子节点的路径
	 * @param path
	 * @return
	 * @throws Exception
	 */
	public List<String> getChildNode(String path) throws Exception{
		path = valiPath(path);
		if (checkExists(path)) {
			List<String> childrenPath = getClient().getChildren().forPath(path);
			return childrenPath;
		}else{
			return null;
		}
	}
	
	/**
	 * 获取该节点下的孩子节点数
	 * @param path
	 * @return
	 * @throws Exception
	 */
	public int countChildren(String path) throws Exception{
		List<String> childrenPath = getChildNode(path);
		if(childrenPath != null){
			return childrenPath.size();
		}else{
			return 0;
		}
	}
	
	/**
	 * 对某个节点的数据添加一个watch
	 * @param path
	 * @param watcher
	 * @return
	 * @throws Exception
	 */
	public byte[] setWatcher(String path,Watcher watcher) throws Exception{
		path = valiPath(path);
		if (checkExists(path)){
			return getClient().getData().usingWatcher(watcher).inBackground().forPath(path);
		}else{
			return null;
		}
	}
	
	/**
	 * =============================================================
	 * curator 引入了cache来实现对Zookeeper服务端事件的监听。
	 * cache是curator中对事件监听的包装，其对事件的监听其实可以近似的看作是
	 * 一个本地缓存视图和远程zookeeper视图的对比过程。
	 * 同时curator能够自动为开发人员处理反复注册监听。
	 * cache分为两类监听：节点监听和子节点监听
	 * =============================================================
	 */
	
	/**
	 * 对某数据节点设置节点监听,可以监听节点内容变更，也可监听指定节点是否存在
	 * 1、节点数据改变触发监听
	 * 2、节点不存在创建候触发监听
	 * 3、节点被删除，则无法触发监听
	 * @param path
	 * @param listener 如何设置监听
	 * new NodeCacheListener(){
	 *     public void nodeChanged() throws Exception {
	 *			System.out.println("Node data uodate,new data: "+new String(cache.getCurrentData().getData()));
	 *		}
	 *	};
	 * @return
	 * @throws Exception 
	 */
	@SuppressWarnings("resource")
	public boolean setNodeCacheListener(String path, NodeCacheListener listener) throws Exception {
		path = valiPath(path);
		final NodeCache cache = new NodeCache(getClient(),path,false);//参数说明：curator客户端、数据节点的节点路径，是否进行数据压缩
		cache.start(true);//默认是false，true则nodeCache在第一次启动时，就会立刻从zookeeper上读取对应节点数据内容，并保存在cache中。
		cache.getListenable().addListener(listener);
		cache.start();
		return true;
	}
	
	/**
	 * 添加监听子节点。注意：curator无法对二级子节点进行事件监听
	 * 1、新增子节点
	 * 2、子节点数据变更
	 * 3、子节点删除
	 * @param path 当前节点路径
	 * @param listener 监听
	 * new PathChildrenCacheListener() {
	 *			public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
	 *				switch (event.getType()) {
	 *				case CHILD_ADDED://节点添加
	 *					event.getData().getPath();//添加的孩子节点路径
	 *					break;
	 *				case CHILD_UPDATED://节点更新
	 *					event.getData().getPath();//更新的孩子节点路径
	 *					break;
	 *				case CHILD_REMOVED://节点移除
	 *					event.getData().getPath();//移除的孩子节点路径
	 *					break;
	 *				default:
	 *					break;
	 *				}
	 *			}
	 *		};
	 * @return
	 * @throws Exception
	 */
	@SuppressWarnings("resource")
	public boolean setChildrenNodeCacheListner(String path, PathChildrenCacheListener listener) throws Exception{
		path = valiPath(path);
		if(checkExists(path)){
			PathChildrenCache cache = new PathChildrenCache(getClient(), path, true);//参数说明：客户端实例/节点路径/节点内容缓存（true在数据变更时能获取节点数据内容，false则不能）/是否数据压缩
			cache.start(StartMode.POST_INITIALIZED_EVENT);
			cache.getListenable().addListener(listener);
			cache.start();
			return true;
		}
		return false;
	}
	
	
	/**
	 * Path Cache和Node Cache的“合体”，监视路径下的创建、更新、删除事件，并缓存路径下所有孩子结点的数据
	 * @param path
	 * @param listener
	 * new TreeCacheListener() {
	 *		public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
	 *			switch (event.getType()) {
	 *			case NODE_ADDED:
	 *				String strList1 = event.getData().getPath();
	 *				System.out.println("返回结果：" + strList1);
	 *				break;
	 *			case NODE_REMOVED:
	 *				String strList2 = event.getData().getPath();
	 *				System.out.println("返回结果：" + strList2);
	 *				break;
	 *			case NODE_UPDATED:
	 *				String strList3 = event.getData().getPath();
	 *				System.out.println("返回结果：" + strList3);
	 *				break;
	 *			default:
	 *				break;
	 *			}
	 *		}
	 *	};
	 * @return
	 * @throws Exception 
	 */
	@SuppressWarnings("resource")
	public boolean setTreeCacheListner(String path, TreeCacheListener listener) throws Exception{
		path = valiPath(path);
		ExecutorService pool = Executors.newFixedThreadPool(2);
		final TreeCache treeCache = new TreeCache(getClient(), path);
		treeCache.getListenable().addListener(listener,pool);
		treeCache.start();
		return true;
	}
	/**
	 * 判断该路径是否存在
	 * @param path
	 * @return
	 * @throws Exception
	 */
	public boolean checkExists(String path) throws Exception{
		if(getClient().checkExists().creatingParentContainersIfNeeded().forPath(path) != null){
			return true;
		}
		return false;
	}
	
	/**
	 * 规范节点路径
	 * @param path
	 * @return
	 */
	private static String valiPath(String path){
		if(StringUtils.isEmpty(path)){
			throw new NullPointerException("path路径为空.");
		}
		if(path.startsWith("/") && !path.endsWith("/")){
			return path;
		}
		StringBuffer uPath = new StringBuffer();
		if(!path.startsWith("/")){
			uPath.append("/");
		}
		if(path.endsWith("/")){
			path = path.substring(0, path.length()-1);
		}
		uPath.append(path);
		return path.toString();
	}

	
	public static void main(String[] args) {
		long time2 = System.nanoTime();
		System.out.println(valiPath("/aa/nn/"));
		long time21 = System.nanoTime();
		System.out.println(time21 - time2);
		
		long time3 = System.nanoTime();
		System.out.println(valiPath("aa/nn/"));
		long time31 = System.nanoTime();
		System.out.println(time31 - time3);
		
		long time4 = System.nanoTime();
		System.out.println(valiPath("/aa/nn"));
		long time41 = System.nanoTime();
		System.out.println(time41 - time4);
	}
	
	/**
	 * 获取ZK客户端
	 * @return the client
	 */
	public CuratorFramework getClient() {
		return ZKClient.getInstance();
	}
	
	/**
	 * 关闭客户端
	 */
	public void close(){
		ZKClient.close();
	}
}
